package com.abyss.transformation;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FilterOperator;
/**
 * @author Abyss
 * @date 2020/10/4
 * @description
 */
public class FilterDemo {
    public static void main(String[] args) throws Exception {
        // 1. Env
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 2. Source
        DataSource<String> fileSource = env.readTextFile("/Users/abyss/Dev/toys/flink/H-flink-learn/src/main/resources/apache.log");

        // 3. filter方法来过滤ip
        FilterOperator<String> result = fileSource.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                return value.split(" ")[0].equals("83.149.9.216");
            }
        });

        // 4. 输出
        result.print();
    }
}
